package cn.itcast.flink.table;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

/**
 * Author itcast
 * Date 2021/7/30 6:38
 * Desc TODO
 */
public class FlinkTableDemo2 {
    public static void main(String[] args) throws Exception {
        //1.准备环境 创建流执行环境和流表环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment sEnv = StreamTableEnvironment.create(env);
        //2.Source 自定义Order 每一秒中睡眠一次
        //2.Source
        DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
            private Boolean isRunning = true;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(order);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });
        //3.Transformation 分配时间戳和水印2秒
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner((event, timestamp) -> event.getCreateTime())
                );
        //4.注册表 创建临时视图并分配 rowtime
        sEnv.createTemporaryView("t_order", watermakerDS,
                $("orderId"), $("userId"), $("money"), $("createTime").rowtime());
        //5.执行 TableAPI 查询获取订单总数，最大金额和最小金额
        //5.1 从 t_order 表获取表
        //5.3 根据 tumbleWindow 和 userId 分组
        Table orders = sEnv.from("t_order")
                //5.2 在 createTime 字段上设置滚动窗口 5 秒钟
                .window(Tumble.over(lit(5).second())
                        .on($("createTime"))
                        .as("tumbleWindow"))
                //5.4 查询 userId,订单计数,最大金额,最小金额
                .groupBy($("tumbleWindow"),$("userId"))
                .select($("userId"),
                        $("orderId").count().as("totalCnt"),
                        $("money").max().as("maxMoney"),
                        $("money").min().as("minMoney"));
        //6.Sink toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        DataStream<Tuple2<Boolean, Row>> result = sEnv.toRetractStream(orders, Row.class);
        //7.打印输出
        result.print();
        //8.执行
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long createTime;
    }
}
